package com.retry.task.admin.infrastructure.scheduler;

import com.google.common.collect.Maps;
import com.retry.task.admin.constants.SystemLocks;
import com.retry.task.admin.infrastructure.lock.ILock;
import com.retry.task.admin.infrastructure.selector.ISelectExector;
import com.retry.task.admin.infrastructure.selector.SelectorManager;
import com.retry.task.admin.biz.service.RetryTaskGroupBizService;
import com.retry.task.admin.biz.service.RetryTaskLogBizService;
import com.retry.task.admin.biz.service.RetryTaskProjectBizService;
import com.retry.task.admin.biz.service.RetryTaskBizService;
import com.retry.task.admin.dal.model.RetryTaskGroupDO;
import com.retry.task.admin.dal.model.query.RetryTaskGroupQuery;
import com.retry.task.admin.dal.model.query.RetryTaskQuery;
import com.retry.task.core.constants.RetryTaskStatusEnum;
import com.retry.task.core.constants.RetryTaskURIEnum;
import com.retry.task.core.constants.SelectorExectorEnum;
import com.retry.task.core.exception.ExceptionCode;
import com.retry.task.core.executor.AbstractExecutor;
import com.retry.task.core.model.RetryTaskContext;
import com.retry.task.core.model.RetryTaskDTO;
import com.retry.task.core.model.RetryTaskLogDTO;
import com.retry.task.core.model.base.PageResult;
import com.retry.task.core.model.base.Result;
import com.retry.task.core.utils.GsonTool;
import com.retry.task.core.utils.HttpClientUtils;
import com.retry.task.sequence.Sequence;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
 * @author gao.gwq
 * @version 1.0
 * @date 2022/4/19  20:39
 * @Description TODO
 */
@Component
public class RetryTaskScheduler implements InitializingBean, IRetryTaskScheduler {

    private static final Logger LOGGER = LoggerFactory.getLogger(RetryTaskScheduler.class);

    private static ScheduledExecutorService scheduledExecutorService;

    @Autowired
    private ILock mysqlLock;
    @Autowired
    @Qualifier("retryTaskLogSequence")
    private Sequence retryTaskLogSequence;

    @Autowired
    private RetryTaskBizService retryTaskService;

    @Autowired
    private RetryTaskLogBizService retryTaskLogService;

    @Autowired
    private RetryTaskGroupBizService retryTaskGroupService;

    @Autowired
    private RetryTaskProjectBizService retryTaskProjectService;

    @Autowired
    private AbstractExecutor serverExecutorImpl;

    private volatile long suspendTime = 0L;

    @Value("${envirment.isTest}")
    private Integer isTest;
    /**
     * 是否挂起
     */
    private boolean suspendSchedulerFlag = false;

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                if (scheduledExecutorService != null) {
                    if (!scheduledExecutorService.isShutdown()) {
                        scheduledExecutorService.shutdown();
                    }
                }
            }
        }));
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        startScheduler();
    }

    @Override
    public void stopScheduler() {
        if (scheduledExecutorService == null) {
            return;
        }
        scheduledExecutorService.shutdown();
    }

    @Override
    public boolean reStartScheduler() {
        suspendSchedulerFlag = false;
        return true;
    }

    @Override
    public boolean suspendScheduler(Long seconds) {
        if (seconds == null) {
            return false;
        }
        if (seconds.intValue() <= 0) {
            return false;
        }
        suspendTime = System.currentTimeMillis() + seconds * 1000;
        return true;
    }

    @Override
    public boolean startScheduler() {
        initScheduler();
        return true;
    }

    private void initScheduler() {
        scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        scheduledExecutorService.scheduleWithFixedDelay(new SchedulerRunable(), 20, 10, TimeUnit.SECONDS);
    }

    protected void processTask(RetryTaskQuery query, Consumer<List<RetryTaskDTO>> consumer) {
        int pageNum = 1;
        while (true) {
            query.setPageNum(pageNum);
            PageResult<List<RetryTaskDTO>> pageResultDTO = retryTaskService.queryRetryTask(query);
            if (!pageResultDTO.isSuccess()) {
                LOGGER.error("[[RetryTaskProcessor-queryTetryTask]] query retry task error,{}",
                    pageResultDTO.getErrMsg());
                break;
            }
            List<RetryTaskDTO> retryTaskDTOS = pageResultDTO.getData();
            if (CollectionUtils.isEmpty(retryTaskDTOS)) {
                break;
            }
            consumer.accept(retryTaskDTOS);
            //sendRetryTaskToMetaq(retryTaskDTOS);
            if (retryTaskDTOS.size() < query.getPageSize()) {
                break;
            }
            pageNum++;
        }
    }

    private RetryTaskContext buildRetryTaskContext(RetryTaskDTO retryTaskDTO) {
        RetryTaskContext retryTaskContext = new RetryTaskContext();
        retryTaskContext.setTaskId(retryTaskDTO.getId());
        retryTaskContext.setTaskName(retryTaskDTO.getTaskName());
        retryTaskContext.setTaskDesc(retryTaskDTO.getTaskDesc());
        retryTaskContext.setProjectName(retryTaskDTO.getProjectName());
        retryTaskContext.setParameters(retryTaskDTO.getParameters());
        retryTaskContext.setIsTest(retryTaskDTO.getIsTest());
        retryTaskContext.setToken(retryTaskDTO.getToken());
        retryTaskContext.setInstanceId(retryTaskLogSequence.nextValue());
        return retryTaskContext;
    }

    public void sendRetryTask(List<RetryTaskDTO> retryTaskDTOS) {
        Map<String, List<RetryTaskGroupDO>> groupListMap = Maps.newHashMap();
        Map<String, String> tokenMap = Maps.newHashMap();
        for (RetryTaskDTO taskDTO : retryTaskDTOS) {
            String traceId = UUID.randomUUID().toString();
            try {
                MDC.put("TRACE_ID", traceId);
                String projectName = taskDTO.getProjectName();
                RetryTaskContext retryTaskContext = buildRetryTaskContext(taskDTO);
                retryTaskContext.setTraceId(traceId);
                retryTaskLogService.updateTaskAndCreateLog(taskDTO, traceId,
                    retryTaskContext.getInstanceId());
                List<RetryTaskGroupDO> groupDOList = groupListMap.get(projectName);
                if (CollectionUtils.isEmpty(groupDOList)) {
                    groupDOList = getRetryTaskGroupDOS(taskDTO);
                }
                if (CollectionUtils.isEmpty(groupDOList)) {
                    LOGGER.error("retry-task project {},isTest {} is not register", projectName, isTest);
                    continue;
                }
                String token = tokenMap.get(projectName);
                if (token == null) {
                    Result<String> resultDTO = retryTaskProjectService.getRetryTaskProjectToken(projectName,
                        taskDTO.getIsTest());
                    if (resultDTO.getData() == null) {
                        LOGGER.error("retry-task project {},isTest {} is not register", projectName, isTest);
                    }
                    token = resultDTO.getData();
                    tokenMap.put(projectName, token);
                }
                retryTaskContext.setToken(token);
                groupListMap.put(taskDTO.getProjectName(), groupDOList);
                RetryTaskGroupDO retryTaskGroupDO = getRetryTaskGroup(taskDTO, groupDOList);
                doSendRetryTask(retryTaskContext, retryTaskGroupDO);
            } finally {
                MDC.remove("TRACE_ID");
            }
        }
    }

    private List<RetryTaskGroupDO> getRetryTaskGroupDOS(RetryTaskDTO taskDTO) {
        List<RetryTaskGroupDO> groupDOList;
        RetryTaskGroupQuery query = new RetryTaskGroupQuery();
        query.setProjectName(taskDTO.getProjectName());
        query.setIsTest(isTest);
        query.setPageNum(1);
        query.setPageSize(200);
        query.setStatus(1);
        groupDOList = retryTaskGroupService.queryByRetryTaskGroups(query);
        return groupDOList;
    }

    private RetryTaskGroupDO getRetryTaskGroup(RetryTaskDTO taskDTO, List<RetryTaskGroupDO> groupDOList) {
        String selectExectorType = taskDTO.getSelectorType();
        if (selectExectorType == null) {
            selectExectorType = SelectorExectorEnum.LAST_HEART.toString();
        }
        ISelectExector iSelectExector = SelectorManager.getSelector(selectExectorType);

        RetryTaskGroupDO retryTaskGroupDO = iSelectExector.bestExectorSelect(groupDOList);
        return retryTaskGroupDO;
    }

    private void doSendRetryTask(RetryTaskContext retryTaskContext, RetryTaskGroupDO retryTaskGroupDO) {

        String realUri = retryTaskGroupDO.getExecuteIp() + RetryTaskURIEnum.RUN.getUri();

        Result resultDTO = HttpClientUtils.postBody(realUri, retryTaskContext,
            String.class, retryTaskContext.getTraceId());
        LOGGER.info("retry-task 发送任务={},发送结果={}", GsonTool.toJsonStringIgnoreNull(retryTaskContext),
            GsonTool.toJsonStringIgnoreNull(resultDTO));
        if (!resultDTO.isSuccess() && StringUtils.equals(resultDTO.getCode(),
            ExceptionCode.CAN_NOT_FIND_HEART_TICK)) {
            updateTaskLogNoHeartTick(retryTaskContext, resultDTO);
        }
        if (!resultDTO.isSuccess() && StringUtils.equals(resultDTO.getCode(),
            ExceptionCode.CONNECTION_EXCEPTION)) {
            updateTaskLogNoHeartTick(retryTaskContext, resultDTO);
        }
    }

    /**
     * 如果没有心跳返回的错误，则更新日志
     *
     * @param retryTaskContext
     * @param resultDTO
     */
    private void updateTaskLogNoHeartTick(RetryTaskContext retryTaskContext, Result resultDTO) {
        RetryTaskLogDTO retryTaskLogDTO = new RetryTaskLogDTO();
        retryTaskLogDTO.setTaskId(retryTaskContext.getTaskId());
        retryTaskLogDTO.setId(retryTaskContext.getInstanceId());
        retryTaskLogDTO.setIsTest(retryTaskLogDTO.getIsTest());
        retryTaskLogDTO.setStatus(RetryTaskStatusEnum.FAIL.getCode());
        retryTaskLogDTO.setType(1);
        retryTaskLogDTO.setIsTest(retryTaskContext.getIsTest());
        retryTaskLogDTO.setErrorMessage(resultDTO.getErrMsg());
        retryTaskLogDTO.setEndTime(new Date());
        retryTaskLogDTO.setToken(retryTaskContext.getToken());
        retryTaskLogDTO.setProjectName(retryTaskContext.getProjectName());
        serverExecutorImpl.updateRetryTaskLog(retryTaskLogDTO);
    }

    private RetryTaskQuery buildRetryTaskQuery(Map<String, String> paramsMap) {
        RetryTaskQuery query = new RetryTaskQuery();
        int pageSize = 300;
        String idListStr = paramsMap.get("idList");
        if (StringUtils.isNotEmpty(idListStr)) {
            String[] idListArr = idListStr.split(",");
            List<Long> idList = Arrays.stream(idListArr).map(id -> Long.valueOf(id)).collect(Collectors.toList());
            query.setIdList(idList);
        }
        String pageSizeStr = paramsMap.get("pageSize");
        if (StringUtils.isNotEmpty(pageSizeStr)) {
            Integer pageSizeInteger = Integer.valueOf(pageSizeStr);
            if (pageSizeInteger > 300) {
                pageSizeInteger = 300;
            }
            pageSize = pageSizeInteger;
        }
        String minRetryNumStr = paramsMap.get("minRetryNum");
        Integer minRetryNum = 1;
        if (StringUtils.isNotEmpty(minRetryNumStr)) {
            minRetryNum = Integer.valueOf(minRetryNumStr);
        }
        query.setMinRetryNum(minRetryNum);
        String maxRetryNumStr = paramsMap.get("minRetryNum");
        if (StringUtils.isNotEmpty(maxRetryNumStr)) {
            Integer maxRetryNum = Integer.valueOf(maxRetryNumStr);
            query.setMaxRetryNum(maxRetryNum);
        }
        query.setPageNum(pageSize);
        query.setStatus(RetryTaskStatusEnum.TO_START.getCode());
        query.setMaxNextPlanTime(new Date());
        query.setIsTest(isTest);
        return query;
    }

    private class SchedulerRunable implements Runnable {
        @Override
        public void run() {
            //如果已经挂起，则判断挂起时间是否已经过 ，如果超过进入下面的流程，如果没有则
            //直接返回
            if (suspendSchedulerFlag == true) {
                long currentTime = System.currentTimeMillis();
                if (currentTime < suspendTime) {
                    return;
                } else {
                    suspendSchedulerFlag = false;
                    suspendTime = 0L;
                }
            }
            boolean lockFlag = mysqlLock.tryLock(SystemLocks.RETRY_TASK_PUBLISH_LOCK, isTest);
            if (lockFlag == false) {
                return;
            }
            try {

                Map<String, String> paramsMap = Maps.newHashMap();

                RetryTaskQuery query = buildRetryTaskQuery(paramsMap);
                processTask(query, retryTaskDTOS -> {
                    sendRetryTask(retryTaskDTOS);
                });
            } catch (Exception ex) {
                LOGGER.error("retry-task send retry task error {}", ex.getMessage(), ex);
            } finally {
                mysqlLock.releasLock(SystemLocks.RETRY_TASK_PUBLISH_LOCK, isTest);
            }

        }
    }

}
